//
// The Commit Protocol takes as input a set of produce requests and commits the
// associated batches. This involves (1) building and uploading the batch data
// to cloud storage, (2) committing metadata about the object into each
// partition targetted by the produce requests and (3) acknowledging each client
// with the final committed offset.
//

// An L0 object stores payload data from produced batches without offsets.
// In the model, the position of an entry in an L0 object is used as its
// identifer in placeholder batches. The values in the L0 object are unique
// batch identifiers generated by the client. They represent the "content" of
// the batch, and must not be used by the model except for validation.
type L0_object = seq[int];

machine CommitProtocol {
  var storage: Storage;
  var epoch_service: EpochService;
  var requests: seq[produce_request];

  // the L0 object that will be built and uploaded
  var object: L0_object;
  var object_id: int;

  // mapping from batch offset in object to source request
  var request_by_object_offset: seq[int];
  // mapping from append request id to offset of batch in object
  var object_offset_by_request_id: map[int, int];

  // holds the next request id
  var request_id: int;

  // holds the epoch for the L0 object
  var epoch: int;

  start state Init {
    entry (input: (storage: Storage, epoch_service: EpochService, requests: seq[produce_request])) {
      storage = input.storage;
      epoch_service = input.epoch_service;
      requests = input.requests;
      epoch = -1;
      goto Commit;
    }
  }

  state Commit {
    entry {
      get_epoch();
      assert epoch >= 0;

      build_L0();
      upload_L0();
      append_placeholders();
    }

    // Respond to the produce request with the committed offset.
    on append_response_event do (response: append_response) {
      var offset: int;
      var request: produce_request;

      // map the partition append response back the original produce request.
      // first the placeholder append request is mapped back to its respective
      // batch expressed as an offset within the L0 object. This offset is then
      // mapped back to the originating produce request using the
      // `request_by_object_offset` shuffle index.
      offset = request_by_object_offset[object_offset_by_request_id[response.request_id]];
      request = requests[offset];

      send request.source, produce_response_event, (
        request_id = request.request_id,
        offset = response.offset);
    }
  }

  // Get the epoch for the L0 object. This is done by sampling the epoch
  // service directly. In a real system we will cache the epoch instead of
  // performing RPC for every operation. The model captures this property
  // because the epoch service does not increment the epoch on every read.
  fun get_epoch() {
    send epoch_service, get_epoch_request_event, (
      source = this,
      request_id = request_id);

    receive {
      case get_epoch_response_event: (response: get_epoch_response) {
        assert response.request_id == request_id;
        epoch = response.epoch;
      }
    }

    request_id = request_id + 1;
  }

  // Build the L0 object from the input produce requests. Batches in the L0
  // object may have any ordering as long as the request_by_object_offset mapping
  // maintains the property that for each offset in the object, the value of
  // request_by_object_offset[offset] is the offset of the source request.
  //
  // More detail about request_by_object_offset: currently the order of batches in the
  // L0 offset is the same as the ordering of their respective produce requests
  // in the `requests` sequence, which means that only the `offset` is necessary
  // to correlate items between the data structures. This means that currently
  // `request_by_object_offset` is effectively an identify function. However in general
  // this ordering need not hold and we may  want to model other organizations
  // of data in the L0 object. The `request_by_object_offset` map allows the freedom
  // to organize L0 contents by providing an index from object offset back to
  // the original request index. See `append_response_event` handler for use.
  fun build_L0() {
    var request_idx: int;
    var request: produce_request;
    while (request_idx < sizeof(requests)) {
      object += (sizeof(object), requests[request_idx].batch_id);
      request_by_object_offset += (sizeof(request_by_object_offset), request_idx);
      request_idx = request_idx + 1;
    }
  }

  // Upload the L0 object. For convenience and without loss of generality, the
  // storage service, rather than the broker, generates a globally unique
  // identifier for the object.
  fun upload_L0() {
    send storage, put_group_request_event, (
      source = this,
      request_id = request_id,
      group = epoch,
      object = object);

    receive {
      case put_response_event: (response: put_response) {
        assert response.request_id == request_id;
        object_id = response.object_id;
      }
    }

    request_id = request_id + 1;
  }

  // Append placeholder batches to every partition associated with data
  // contained in the L0 object. The append_to_request index maps append
  // requests to the request's source batch in the L0 object.
  fun append_placeholders() {
    var batch_idx: int;
    var request: produce_request;

    while (batch_idx < sizeof(object)) {
      object_offset_by_request_id[request_id] = batch_idx;
      request = requests[request_by_object_offset[batch_idx]];

      send request.partition, append_request_event, (
        source = this,
        request_id = request_id,
        batch = (L0_object_id = object_id, L0_offset = batch_idx));

      request_id = request_id + 1;
      batch_idx = batch_idx + 1;
    }
  }
}
